/*
时间、uuid等通用处理函数
*/

use snowflake::SnowflakeIdGenerator;
use once_cell::sync::OnceCell;
use std::sync::RwLock;
use crate::conf_init::normal_conf;
use crate::data_frame::{value::{RealValue}};
use crate::physical_plan::PhysicalPlanExprArg;
use crate::run_plan::HandResult;
use chrono::{Local, NaiveDateTime, LocalResult};
use crate::data_frame::single_data::SingleData;
use chrono_tz::Tz;
use crate::chrono::TimeZone;
use crate::lazy_static::__Deref;
use crate::physical_plan::filter::{is_ipv4,is_ipv6};

use super::element::get_element;
use crate::physical_plan::window::SortByValue;

static SNOW_FLAKE: OnceCell<RwLock<SnowflakeIdGenerator>> = OnceCell::new();

/*
返回基于雪花算法计算出来的唯一id值（u64）
*/
#[inline]
pub fn uuid(args: &[Box<PhysicalPlanExprArg>], _data: &mut SingleData) -> HandResult {
    if args.len() != 0{
        return HandResult::Err(());
    }

    let tmp = SNOW_FLAKE.get_or_init(||{
        let normal_conf = normal_conf();
        RwLock::new(SnowflakeIdGenerator::new(
            normal_conf.snowflake_machine_id as i32,
        normal_conf.snowflake_node_id as i32))
    });

    let id = tmp.write().unwrap().generate() as u64;

    return HandResult::Ok(RealValue::U64(id));
}
#[inline]
pub fn uuid_single(args: &[Box<PhysicalPlanExprArg>], _data: &mut SingleData) -> HandResult {
    if args.len() != 0{
        return HandResult::Err(());
    }

    let tmp = SNOW_FLAKE.get_or_init(||{
        let normal_conf = normal_conf();
        RwLock::new(SnowflakeIdGenerator::new(
            normal_conf.snowflake_machine_id as i32,
            normal_conf.snowflake_node_id as i32))
    });

    let id = tmp.write().unwrap().generate() as u64;

    return HandResult::Ok(RealValue::U64(id));
}
/*
返回本地时间的毫秒值（millis）
*/
#[inline]
pub fn time_now(args: &[Box<PhysicalPlanExprArg>], _data: &mut SingleData) -> HandResult {
    if args.len() != 0{
        return HandResult::Err(());
    }

    let tz = Tz::Asia__Shanghai;
    let local = Local::now();
    let native_datetime = NaiveDateTime::from_timestamp(local.timestamp(), local.timestamp_subsec_nanos());
    let now = if let LocalResult::Single(ts) =  tz.from_local_datetime(&native_datetime){
        ts
    }else{
        return HandResult::Err(());
    };

    return HandResult::Ok(RealValue::DateTime(now));
}

#[inline]
pub fn datetime_format(args: &[Box<PhysicalPlanExprArg>], _data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        return HandResult::Err(());
    }

    let element = &args[0];
    let time_type = &args[1];

    match element.deref() {
        PhysicalPlanExprArg::RealValue(element) => {
            match time_type.deref() {
                PhysicalPlanExprArg::ConfValue(new_type) => {
                    let tz = Tz::Asia__Shanghai;
                    match new_type.value.deref().to_string().as_str() {
                        "rfc3339" => {
                            match element {
                                RealValue::DateTime(t) => {return HandResult::Ok(RealValue::Utf8(t.to_rfc3339()))}
                                _ => {return HandResult::Err(())}
                            }
                        }
                        "rfc2822" => {
                            match element {
                                RealValue::DateTime(t) => {return HandResult::Ok(RealValue::Utf8(t.to_rfc2822()))}
                                _ => {return HandResult::Err(())}
                            }
                        }
                        _ => {return HandResult::Err(());}
                    }
                }
                _ => {
                    return HandResult::Err(());
                }
            }
        }
        _ => {
            return HandResult::Err(());
        }
    }
}

#[inline]
pub fn realtype_convert(args: &[Box<PhysicalPlanExprArg>], _data: &mut SingleData) -> HandResult {
    if args.len() != 2 {
        return HandResult::Err(());
    }

    let element = &args[0];
    let new_type = &args[1];

    match element.deref() {
        PhysicalPlanExprArg::RealValue(element) => {
            match new_type.deref() {
                PhysicalPlanExprArg::ConfValue(new_type) => {
                    match element.convert(new_type.value.to_string()) {
                        RealValue::None => return HandResult::Err(()),
                        r => return HandResult::Ok(r),
                    }
                }
                _ => {
                    return HandResult::Err(());
                }
            }
        }
        _ => {
            return HandResult::Err(());
        }
    }
}

#[inline]
pub fn get_ip_version_element(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if is_ipv4(args,data) == HandResult::Ok(RealValue::Boolean(true)) {
        return HandResult::Ok(RealValue::U8(4));
    } else if is_ipv6(args,data) == HandResult::Ok(RealValue::Boolean(true)) {
        return HandResult::Ok(RealValue::U8(6));
    } else {
        return HandResult::Ok(RealValue::U8(0));
    }
}

pub fn add_datetime_ms2s_element(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if let HandResult::Ok(RealValue::U64(a)) = get_element(args, data) {
        let a = a/1000;
        return HandResult::Ok(RealValue::U64(a));
    } else {
        return HandResult::Err(());
    }
}

pub fn trans_vxlan_element(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {

    let mut s:String = "".to_string();

    for key in args {
        match key.deref(){
            PhysicalPlanExprArg::RealValue(_) => {return HandResult::Err(());},
            PhysicalPlanExprArg::ConfValue(_) => {return HandResult::Err(());},
            PhysicalPlanExprArg::ElementValue(key) => {
                match data.get_key_value_always(key){
                    RealValue::U64(a)=>{
                        if a > 0 {
                            if key.element[0] == "vxlan_vni" {
                                s = format!("vxlan{}",a);
                            } else if  key.element[0] == "vxlan_flags" {
                                s = format!("{} flags{}",s,a);
                            }
                        }
                    }
                    _=>{
                        return HandResult::Err(());
                    }
                }
            },
        }
    }

    return HandResult::Ok(RealValue::Utf8(s));
}

pub fn trans_vlan_element(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {

    let mut s:String = "".to_string();

    for key in args {
        match key.deref(){
            PhysicalPlanExprArg::RealValue(_) => {return HandResult::Err(());},
            PhysicalPlanExprArg::ConfValue(_) => {return HandResult::Err(());},
            PhysicalPlanExprArg::ElementValue(key) => {
                match data.get_key_value_always(key){
                    RealValue::U64(a)=>{
                        if a > 0 {
                            if s.len() == 0 {
                                s = format!("vlan{}",a);
                            } else {
                                s = format!("{},vlan{}",s,a);
                            }
                        }
                    }
                    _=>{
                        return HandResult::Err(());
                    }
                }
            },
        }
    }

    return HandResult::Ok(RealValue::Utf8(s));
}
/*
    获取当前事件所属的work_space的名字
*/
pub fn work_space_name(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 0{
        return HandResult::Err(());
    }

    let work_space_name = data.metadata.work_space.as_ref().clone();
    return HandResult::Ok(RealValue::Utf8(work_space_name));
}
/*
    获取当前事件所属的work_node的名字
*/
pub fn work_node_name(args: &[Box<PhysicalPlanExprArg>], data: &mut SingleData) -> HandResult {
    if args.len() != 0{
        return HandResult::Err(());
    }

    let work_node_name = data.metadata.work_node.as_ref().clone();
    return HandResult::Ok(RealValue::Utf8(work_node_name));
}
